/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.service.pipeline.impl;

import com.chinamobile.cmss.lakehouse.common.dto.ClusterCreateStatus;
import com.chinamobile.cmss.lakehouse.core.pipeline.CheckHandler;
import com.chinamobile.cmss.lakehouse.core.pipeline.CreateHandler;
import com.chinamobile.cmss.lakehouse.core.pipeline.PipelineContext;
import com.chinamobile.cmss.lakehouse.core.pipeline.ReleaseHandler;
import com.chinamobile.cmss.lakehouse.service.pipeline.PipelineExecutor;

import java.util.List;
import java.util.Map;
import java.util.Objects;

import javax.annotation.Resource;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class PipelineExecutorImpl implements PipelineExecutor {
    /**
     * Reference createPipelineMap in PipelineConfig
     */
    @Resource
    private Map<Class<? extends PipelineContext>, List<? extends CreateHandler<? super PipelineContext>>> createPipelineMap;

    @Resource
    private Map<Class<? extends PipelineContext>, List<? extends ReleaseHandler<? super PipelineContext>>> releasePipelineMap;

    @Resource
    private Map<Class<? extends PipelineContext>, List<? extends CheckHandler<? super PipelineContext>>> checkPipelineMap;

    @Override
    public boolean createSync(PipelineContext context) {
        Objects.requireNonNull(context, "create context should not be null");
        Class<? extends PipelineContext> dataType = context.getClass();
        List<? extends CreateHandler<? super PipelineContext>> pipeline =
            createPipelineMap.get(dataType);

        if (CollectionUtils.isEmpty(pipeline)) {
            log.error("{} pipeline is empty", dataType.getSimpleName());
            return false;
        }

        boolean lastSuccess = true;

        for (CreateHandler<? super PipelineContext> handler : pipeline) {
            try {
                lastSuccess = handler.handleCreate(context);
            } catch (Throwable ex) {
                lastSuccess = false;
                log.error("[{}] process failed，handler={},{}", context.getContextType(),
                    handler.getClass().getSimpleName(), ex.getMessage());
            }

            if (!lastSuccess) {
                log.error("[{}] process failed，handler={}", context.getContextType(),
                    handler.getClass().getSimpleName());
                break;
            }
        }

        return lastSuccess;
    }

    @Override
    public ClusterCreateStatus checkStatus(PipelineContext context) {
        Objects.requireNonNull(context, "check status context should not be null");
        Class<? extends PipelineContext> dataType = context.getClass();
        boolean lastSuccess;
        ClusterCreateStatus result = null;
        List<? extends CheckHandler<? super PipelineContext>> pipeline = checkPipelineMap.get(dataType);
        if (CollectionUtils.isEmpty(pipeline)) {
            log.error("{}  pipeline is empty", dataType.getSimpleName());
            return new ClusterCreateStatus("system", false);
        }

        for (CheckHandler<? super PipelineContext> handler : pipeline) {
            try {
                lastSuccess = handler.handleCheck(context);
            } catch (Throwable ex) {
                lastSuccess = false;
                log.error("[{}] process failed，handler={},{}", context.getContextType(),
                    handler.getClass().getSimpleName(), ex.getMessage());
            }
            result = new ClusterCreateStatus(handler.getClass().getSimpleName(), lastSuccess);
            if (!lastSuccess) {
                log.error("[{}] process failed，handler={}", context.getContextType(),
                    handler.getClass().getSimpleName());
                break;
            }
        }

        return result;
    }

    @Override
    public boolean releaseSync(PipelineContext context) {
        Objects.requireNonNull(context, "release context should not be null");
        Class<? extends PipelineContext> dataType = context.getClass();
        boolean lastSuccess = true;

        List<? extends ReleaseHandler<? super PipelineContext>> pipeline =
            releasePipelineMap.get(dataType);
        if (CollectionUtils.isEmpty(pipeline)) {
            log.error("{}  pipeline is empty", dataType.getSimpleName());
            return false;
        }

        for (ReleaseHandler<? super PipelineContext> handler : pipeline) {
            try {
                lastSuccess = handler.handleRelease(context);
            } catch (Throwable ex) {
                lastSuccess = false;
                log.error("[{}] process failed，handler={},{}", context.getContextType(),
                    handler.getClass().getSimpleName(), ex.getMessage());
            }

            if (!lastSuccess) {
                log.warn("[{}] process failed，handler={}", context.getContextType(),
                    handler.getClass().getSimpleName());
            }
        }

        return lastSuccess;
    }

}
